GatewayServer服务,接收MPNS的消息种类如下:1
2//GatewayServer#init()
messageDispatcher.register(Command.GATEWAY_PUSH, () -> new GatewayPushHandler(mPushServer.getPushCenter()));
处理MPNS来的消息:1
2
3
4
5
6
7
8
9
10
11
12
13
14public final class GatewayPushHandler extends BaseMessageHandler<GatewayPushMessage> {
private final PushCenter pushCenter;
public GatewayPushHandler(PushCenter pushCenter) {
this.pushCenter = pushCenter;
}
public GatewayPushMessage decode(Packet packet, Connection connection) {
return new GatewayPushMessage(packet, connection);
}
public void handle(GatewayPushMessage message) {
pushCenter.push(message);
}
}
网关服务启动类 GatewayServer.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131public final class GatewayServer extends NettyTCPServer {
private ServerChannelHandler channelHandler;
private ConnectionManager connectionManager;
private MessageDispatcher messageDispatcher;
private GlobalChannelTrafficShapingHandler trafficShapingHandler;
private ScheduledExecutorService trafficShapingExecutor;
private MPushServer mPushServer;
public GatewayServer(MPushServer mPushServer) {
super(gateway_server_port, gateway_server_bind_ip);
this.mPushServer = mPushServer;
this.messageDispatcher = new MessageDispatcher();
this.connectionManager = new ServerConnectionManager(false);
this.channelHandler = new ServerChannelHandler(false, connectionManager, messageDispatcher);
}
public void init() {
super.init();
messageDispatcher.register(Command.GATEWAY_PUSH, () -> new GatewayPushHandler(mPushServer.getPushCenter()));
if (CC.mp.net.traffic_shaping.gateway_server.enabled) {//启用流量整形,限流
trafficShapingExecutor = Executors.newSingleThreadScheduledExecutor(new NamedPoolThreadFactory(T_TRAFFIC_SHAPING));
trafficShapingHandler = new GlobalChannelTrafficShapingHandler(
trafficShapingExecutor,
write_global_limit, read_global_limit,
write_channel_limit, read_channel_limit,
check_interval);
}
}
public void stop(Listener listener) {
super.stop(listener);
if (trafficShapingHandler != null) {
trafficShapingHandler.release();
trafficShapingExecutor.shutdown();
}
if (connectionManager != null) {
connectionManager.destroy();
}
}
protected String getBossThreadName() {
return ThreadNames.T_GATEWAY_BOSS;
}
protected String getWorkThreadName() {
return ThreadNames.T_GATEWAY_WORKER;
}
protected int getIoRate() {
return 100;
}
protected int getWorkThreadNum() {
return CC.mp.thread.pool.gateway_server_work;
}
protected void initPipeline(ChannelPipeline pipeline) {
super.initPipeline(pipeline);
if (trafficShapingHandler != null) {
pipeline.addFirst(trafficShapingHandler);
}
}
protected void initOptions(ServerBootstrap b) {
super.initOptions(b);
if (snd_buf.gateway_server > 0) b.childOption(ChannelOption.SO_SNDBUF, snd_buf.gateway_server);
if (rcv_buf.gateway_server > 0) b.childOption(ChannelOption.SO_RCVBUF, rcv_buf.gateway_server);
/**
* 这个坑其实也不算坑,只是因为懒,该做的事情没做。一般来讲我们的业务如果比较小的时候我们用同步处理,等业务到一定规模的时候,一个优化手段就是异步化。
* 异步化是提高吞吐量的一个很好的手段。但是,与异步相比,同步有天然的负反馈机制,也就是如果后端慢了,前面也会跟着慢起来,可以自动的调节。
* 但是异步就不同了,异步就像决堤的大坝一样,洪水是畅通无阻。如果这个时候没有进行有效的限流措施就很容易把后端冲垮。
* 如果一下子把后端冲垮倒也不是最坏的情况,就怕把后端冲的要死不活。
* 这个时候,后端就会变得特别缓慢,如果这个时候前面的应用使用了一些无界的资源等,就有可能把自己弄死。
* 那么现在要介绍的这个坑就是关于Netty里的ChannelOutboundBuffer这个东西的。
* 这个buffer是用在netty向channel write数据的时候,有个buffer缓冲,这样可以提高网络的吞吐量(每个channel有一个这样的buffer)。
* 初始大小是32(32个元素,不是指字节),但是如果超过32就会翻倍,一直增长。
* 大部分时候是没有什么问题的,但是在碰到对端非常慢(对端慢指的是对端处理TCP包的速度变慢,比如对端负载特别高的时候就有可能是这个情况)的时候就有问题了,
* 这个时候如果还是不断地写数据,这个buffer就会不断地增长,最后就有可能出问题了(我们的情况是开始吃swap,最后进程被linux killer干掉了)。
* 为什么说这个地方是坑呢,因为大部分时候我们往一个channel写数据会判断channel是否active,但是往往忽略了这种慢的情况。
*
* 那这个问题怎么解决呢?其实ChannelOutboundBuffer虽然无界,但是可以给它配置一个高水位线和低水位线,
* 当buffer的大小超过高水位线的时候对应channel的isWritable就会变成false,
* 当buffer的大小低于低水位线的时候,isWritable就会变成true。所以应用应该判断isWritable,如果是false就不要再写数据了。
* 高水位线和低水位线是字节数,默认高水位是64K,低水位是32K,我们可以根据我们的应用需要支持多少连接数和系统资源进行合理规划。
*/
if (gateway_server_low > 0 && gateway_server_high > 0) {
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
gateway_server_low, gateway_server_high
));
}
}
public ChannelFactory<? extends ServerChannel> getChannelFactory() {
if (CC.mp.net.tcpGateway()) return super.getChannelFactory();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_ACCEPTOR;
if (CC.mp.net.sctpGateway()) return NioSctpServerChannel::new;
return super.getChannelFactory();
}
public SelectorProvider getSelectorProvider() {
if (CC.mp.net.tcpGateway()) return super.getSelectorProvider();
if (CC.mp.net.udtGateway()) return NioUdtProvider.BYTE_PROVIDER;
if (CC.mp.net.sctpGateway()) return super.getSelectorProvider();
return super.getSelectorProvider();
}
public ChannelHandler getChannelHandler() {
return channelHandler;
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public MessageDispatcher getMessageDispatcher() {
return messageDispatcher;
}
}
网关服务channel处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 .Sharable
public final class ServerChannelHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannelHandler.class);
private static final long profile_slowly_limit = CC.mp.monitor.profile_slowly_duration.toMillis();
private final boolean security; //是否启用加密
private final ConnectionManager connectionManager;
private final PacketReceiver receiver;
public ServerChannelHandler(boolean security, ConnectionManager connectionManager, PacketReceiver receiver) {
this.security = security;
this.connectionManager = connectionManager;
this.receiver = receiver;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
byte cmd = packet.cmd;
try {
Profiler.start("time cost on [channel read]: ", packet.toString());
Connection connection = connectionManager.get(ctx.channel());
LOGGER.debug("channelRead conn={}, packet={}", ctx.channel(), connection.getSessionContext(), msg);
connection.updateLastReadTime();
receiver.onReceive(packet, connection);
} finally {
Profiler.release();
if (Profiler.getDuration() > profile_slowly_limit) {
Logs.PROFILE.info("Read Packet[cmd={}] Slowly: \n{}", Command.toCMD(cmd), Profiler.dump());
}
Profiler.reset();
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
Logs.CONN.error("client caught ex, conn={}", connection);
LOGGER.error("caught an ex, channel={}, conn={}", ctx.channel(), connection, cause);
ctx.close();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Logs.CONN.info("client connected conn={}", ctx.channel());
Connection connection = new NettyConnection();
connection.init(ctx.channel(), security);
connectionManager.add(connection);
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection connection = connectionManager.removeAndClose(ctx.channel());
EventBus.post(new ConnectionCloseEvent(connection));
Logs.CONN.info("client disconnected conn={}", connection);
}
}